/*
 Licensed to Diennea S.r.l. under one
 or more contributor license agreements. See the NOTICE file
 distributed with this work for additional information
 regarding copyright ownership. Diennea S.r.l. licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.

 */

package herddb.server;

import static herddb.proto.PduCodec.ObjectListReader.isDontKeepReadLocks;
import static herddb.proto.PduCodec.TxCommand.TX_COMMAND_BEGIN_TRANSACTION;
import static herddb.proto.PduCodec.TxCommand.TX_COMMAND_COMMIT_TRANSACTION;
import static herddb.proto.PduCodec.TxCommand.TX_COMMAND_ROLLBACK_TRANSACTION;
import herddb.backup.DumpedLogEntry;
import herddb.client.ClientConfiguration;
import herddb.client.DMLResult;
import herddb.client.HDBException;
import herddb.client.ScanResultSet;
import herddb.codec.RecordSerializer;
import herddb.core.HerdDBInternalException;
import herddb.core.RunningStatementInfo;
import herddb.core.RunningStatementsStats;
import herddb.core.TableManager;
import herddb.core.TableSpaceManager;
import herddb.core.stats.ConnectionsInfo;
import herddb.log.LogSequenceNumber;
import herddb.model.DDLStatementExecutionResult;
import herddb.model.DMLStatementExecutionResult;
import herddb.model.DataConsistencyStatementResult;
import herddb.model.DataScanner;
import herddb.model.DataScannerException;
import herddb.model.DuplicatePrimaryKeyException;
import herddb.model.GetResult;
import herddb.model.Index;
import herddb.model.NotLeaderException;
import herddb.model.Record;
import herddb.model.ScanResult;
import herddb.model.Statement;
import herddb.model.StatementEvaluationContext;
import herddb.model.StatementExecutionException;
import herddb.model.StatementExecutionResult;
import herddb.model.Table;
import herddb.model.TableAwareStatement;
import herddb.model.Transaction;
import herddb.model.TransactionContext;
import herddb.model.TransactionResult;
import herddb.model.commands.BeginTransactionStatement;
import herddb.model.commands.CommitTransactionStatement;
import herddb.model.commands.RollbackTransactionStatement;
import herddb.model.commands.SQLPlannedOperationStatement;
import herddb.model.commands.ScanStatement;
import herddb.network.Channel;
import herddb.network.ChannelEventListener;
import herddb.network.ServerSideConnection;
import herddb.proto.Pdu;
import herddb.proto.PduCodec;
import herddb.security.sasl.SaslNettyServer;
import herddb.sql.TranslatedQuery;
import herddb.utils.Bytes;
import herddb.utils.DataAccessor;
import herddb.utils.RawString;
import herddb.utils.TuplesList;
import io.netty.buffer.ByteBuf;
import java.io.EOFException;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.calcite.tools.ValidationException;

/**
 * Handles a client Connection
 *
 * @author enrico.olivelli
 */
public class ServerSideConnectionPeer implements ServerSideConnection, ChannelEventListener {

    private static final Logger LOGGER = Logger.getLogger(ServerSideConnectionPeer.class.getName());
    private static final RawString RAWSTRING_KEY = RawString.of("_key");
    private static final AtomicLong IDGENERATOR = new AtomicLong();
    private final long id = IDGENERATOR.incrementAndGet();
    private final Channel channel;
    private final Server server;
    private final ServerSidePreparedStatementCache preparedStatements;
    /**
     * Open scanners. The ID is generated by the client. On the client side each
     * Scanner is bound to the socket (NettyChannel)
     */
    private final ConcurrentMap<Long, ServerSideScannerPeer> scanners = new ConcurrentHashMap<>();
    private volatile boolean authenticated;
    private volatile SaslNettyServer saslNettyServer;
    private final String address;
    private volatile String username = "";
    private final long connectionTs = System.currentTimeMillis();

    public ServerSideConnectionPeer(Channel channel, Server server) {
        this.channel = channel;
        this.channel.setMessagesReceiver(this);
        this.server = server;
        this.address = channel.getRemoteAddress();
        this.preparedStatements = server.getManager().getPreparedStatementsCache();
        // no need to perform auth in "local" mode
        boolean localMode = ServerConfiguration.PROPERTY_MODE_LOCAL.equals(server.getManager().getMode());
        if (localMode && channel.isLocalChannel()) {
            authenticated = true;
            username = ClientConfiguration.PROPERTY_CLIENT_USERNAME_DEFAULT;
        }
    }

    @Override
    public long getConnectionId() {
        return id;
    }

    @Override
    public void requestReceived(Pdu message, Channel channel) {
        // message is handled by current thread
        boolean releaseMessageSync = true;
        try {
            LOGGER.log(Level.FINEST, "messageReceived {0}", message);

            switch (message.type) {
                case Pdu.TYPE_EXECUTE_STATEMENT: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    releaseMessageSync = false;
                    handleExecuteStatement(message, channel);
                }
                break;
                case Pdu.TYPE_PREPARE_STATEMENT: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    releaseMessageSync = false;
                    handlePrepareStatement(message, channel);
                }
                break;
                case Pdu.TYPE_EXECUTE_STATEMENTS: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    releaseMessageSync = false;
                    handleExecuteStatements(message, channel);
                }
                break;
                case Pdu.TYPE_SASL_TOKEN_MESSAGE_REQUEST: {
                    handleSaslTokenMessageRequest(message, channel);
                }
                break;
                case Pdu.TYPE_SASL_TOKEN_MESSAGE_TOKEN: {
                    handleSaslTokenMessage(message, channel);
                }
                break;
                case Pdu.TYPE_TX_COMMAND: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    releaseMessageSync = false;
                    handleTxCommand(message, channel);
                }
                break;
                case Pdu.TYPE_OPENSCANNER: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleOpenScanner(message, channel);

                }
                break;
                case Pdu.TYPE_FETCHSCANNERDATA: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleFetchScannerData(message, channel);
                }
                break;
                case Pdu.TYPE_CLOSESCANNER: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleCloseScanner(message, channel);
                }
                break;
                case Pdu.TYPE_REQUEST_TABLESPACE_DUMP: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleRequestTablespaceDump(message, channel);
                }
                break;
                case Pdu.TYPE_REQUEST_TABLE_RESTORE: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleRequestTableRestore(message, channel);
                }
                break;
                case Pdu.TYPE_PUSH_TABLE_DATA: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handlePushTableData(message, channel);
                }
                break;
                case Pdu.TYPE_TABLE_RESTORE_FINISHED: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleTableRestoreFinished(message, channel);
                }
                break;
                case Pdu.TYPE_RESTORE_FINISHED: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handleRestoreFinished(message, channel);
                }
                break;
                case Pdu.TYPE_PUSH_TXLOGCHUNK: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handlePushTxLogChunk(message, channel);
                }
                break;
                case Pdu.TYPE_PUSH_TRANSACTIONSBLOCK: {
                    if (!authenticated) {
                        sendAuthRequiredError(channel, message);
                        break;
                    }
                    handlePushTransactionsBlock(message, channel);
                }
                break;
                default:
                    channel.sendReplyMessage(message.messageId,
                            PduCodec.ErrorResponse.write(message.messageId, "unsupported message type " + message.type));
            }
        } finally {
            if (releaseMessageSync) {
                message.close();
            }
        }
    }

    private void handleRequestTableRestore(Pdu message, Channel channel) {
        try {
            long dumpLedgerId = PduCodec.RequestTableRestore.readLedgerId(message);
            long dumpOffset = PduCodec.RequestTableRestore.readOffset(message);
            String tableSpace = PduCodec.RequestTableRestore.readTablespace(message);
            byte[] table = PduCodec.RequestTableRestore.readTableDefinition(message);
            Table tableSchema = Table.deserialize(table);
            tableSchema = Table
                    .builder()
                    .cloning(tableSchema)
                    .tablespace(tableSpace)
                    .build();
            server.getManager()
                    .getTableSpaceManager(tableSpace)
                    .beginRestoreTable(tableSchema.serialize(), new LogSequenceNumber(dumpLedgerId, dumpOffset));
            ByteBuf res = PduCodec.AckResponse.write(message.messageId);
            channel.sendReplyMessage(message.messageId, res);
        } catch (StatementExecutionException err) {
            ByteBuf res = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, res);
        }
    }

    private static ByteBuf composeErrorResponse(long messageId, Throwable err) {
        return PduCodec.ErrorResponse.write(messageId, err, err instanceof NotLeaderException, false);
    }

    private void handleTableRestoreFinished(Pdu message, Channel channel) {
        try {
            String tableSpace = PduCodec.TableRestoreFinished.readTablespace(message);
            String table = PduCodec.TableRestoreFinished.readTableName(message);
            List<byte[]> rawIndexes = PduCodec.TableRestoreFinished.readIndexesDefinition(message);
            int numIndexes = rawIndexes.size();
            List<Index> indexes = new ArrayList<>(numIndexes);
            for (byte[] index : rawIndexes) {
                indexes.add(Index.deserialize(index));
            }
            LOGGER.log(Level.INFO, "tableRestoreFinished, table {0}, with {1} indexes", new Object[]{table, indexes.size()});

            server.getManager()
                    .getTableSpaceManager(tableSpace)
                    .restoreTableFinished(table, indexes);

            ByteBuf res = PduCodec.AckResponse.write(message.messageId);
            channel.sendReplyMessage(message.messageId, res);
        } catch (StatementExecutionException err) {
            ByteBuf res = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, res);
        }
    }

    private void handleRestoreFinished(Pdu message, Channel channel) {
        try {
            String tableSpace = PduCodec.TableRestoreFinished.readTablespace(message);

            server.getManager()
                    .getTableSpaceManager(tableSpace)
                    .restoreFinished();

            ByteBuf res = PduCodec.AckResponse.write(message.messageId);
            channel.sendReplyMessage(message.messageId, res);
        } catch (StatementExecutionException err) {
            ByteBuf res = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, res);
        }
    }

    private void handlePushTableData(Pdu message, Channel channel) {
        try {
            String tableSpace = PduCodec.PushTableData.readTablespace(message);
            String table = PduCodec.PushTableData.readTablename(message);

            long _start = System.currentTimeMillis();
            List<Record> records = new ArrayList<>();
            PduCodec.PushTableData.readRecords(
                    message,
                    (key, value) -> records.add(new Record(
                            Bytes.from_array(key),
                            Bytes.from_array(value))));

            LOGGER.log(Level.INFO, "Received {0} records for restore of table {1} in tableSpace {2}", new Object[]{records.size(), table, tableSpace});
            TableManager tableManager = (TableManager) server.getManager()
                    .getTableSpaceManager(tableSpace)
                    .getTableManager(table);
            tableManager.writeFromDump(records);
            long _stop = System.currentTimeMillis();
            LOGGER.log(Level.INFO, "Time restore {0} records: data {1} ms", new Object[]{records.size(), _stop - _start});
            ByteBuf res = PduCodec.AckResponse.write(message.messageId);
            channel.sendReplyMessage(message.messageId, res);
        } catch (StatementExecutionException err) {
            ByteBuf res = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, res);
        }
    }

    private void handlePushTxLogChunk(Pdu message, Channel channel) {
        try {
            String tableSpace = PduCodec.PushTxLogChunk.readTablespace(message);

            List<DumpedLogEntry> entries = new ArrayList<>();
            PduCodec.PushTxLogChunk.readRecords(
                    message,
                    (key, value) -> entries.add(new DumpedLogEntry(
                            LogSequenceNumber.deserialize(key),
                            value)));

            LOGGER.log(Level.INFO, "Received {0} records for restore of txlog in tableSpace {1}", new Object[]{entries.size(), tableSpace});

            server.getManager().getTableSpaceManager(tableSpace)
                    .restoreRawDumpedEntryLogs(entries);

            ByteBuf res = PduCodec.AckResponse.write(message.messageId);
            channel.sendReplyMessage(message.messageId, res);
        } catch (StatementExecutionException | EOFException err) {
            ByteBuf res = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, res);
        }
    }

    private void handlePushTransactionsBlock(Pdu message, Channel channel) {
        try {
            String tableSpace = PduCodec.PushTransactionsBlock.readTablespace(message);

            List<Transaction> entries = new ArrayList<>();
            PduCodec.PushTransactionsBlock.readTransactions(
                    message,
                    (value) -> entries.add(Transaction.deserialize(tableSpace, value)));

            LOGGER.log(Level.INFO, "Received " + entries.size() + " records for restore of transactions in tableSpace " + tableSpace);

            server.getManager().getTableSpaceManager(tableSpace).restoreRawDumpedTransactions(entries);

            ByteBuf res = PduCodec.AckResponse.write(message.messageId);
            channel.sendReplyMessage(message.messageId, res);
        } catch (StatementExecutionException err) {
            ByteBuf res = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, res);
        }
    }

    private void handleOpenScanner(Pdu message, Channel channel) {

        long txId = PduCodec.OpenScanner.readTx(message);

        String tableSpace = PduCodec.OpenScanner.readTablespace(message);
        long statementId = PduCodec.OpenScanner.readStatementId(message);
        String query =
                statementId > 0 ? preparedStatements.resolveQuery(tableSpace, statementId)
                        : PduCodec.OpenScanner.readQuery(message);
        if (query == null) {
            ByteBuf error = PduCodec.ErrorResponse.writeMissingPreparedStatementError(message.messageId, "bad statement id: " + statementId);
            channel.sendReplyMessage(message.messageId, error);
            return;
        }

        long scannerId = PduCodec.OpenScanner.readScannerId(message);

        int fetchSize = PduCodec.OpenScanner.readFetchSize(message);
        if (fetchSize <= 0) {
            fetchSize = 10;
        }
        int maxRows = PduCodec.OpenScanner.readMaxRows(message); // default 0

        PduCodec.ObjectListReader parametersReader = PduCodec.OpenScanner.startReadParameters(message);
        List<Object> parameters = new ArrayList<>(parametersReader.getNumParams());
        for (int i = 0; i < parametersReader.getNumParams(); i++) {
            parameters.add(parametersReader.nextObject());
        }
        // with clients older than 0.20.0 keepReadLocks will be always true
        byte trailer = parametersReader.readTrailer();
        boolean keepReadLocks = !isDontKeepReadLocks(trailer);
        if (LOGGER.isLoggable(Level.FINER)) {
            LOGGER.log(Level.FINER, "openScanner txId+" + txId + ", fetchSize " + fetchSize + ", maxRows " + maxRows + ", keepReadLocks " + keepReadLocks + ", " + query + " with " + parameters);
        }
        RunningStatementsStats runningStatements = server.getManager().getRunningStatements();
        RunningStatementInfo statementInfo = new RunningStatementInfo(query,
                System.currentTimeMillis(), tableSpace, "", 1);
        try {
            TranslatedQuery translatedQuery = server
                    .getManager()
                    .getPlanner().translate(tableSpace,
                            query, parameters, true, true, false, maxRows);
            translatedQuery.context.setForceRetainReadLock(keepReadLocks);

            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.log(Level.FINEST, "{0} -> {1}", new Object[]{query, translatedQuery.plan.mainStatement});
            }

            TransactionContext transactionContext = new TransactionContext(txId);
            if (translatedQuery.plan.mainStatement instanceof SQLPlannedOperationStatement
                    || translatedQuery.plan.mainStatement instanceof ScanStatement) {
                runningStatements.registerRunningStatement(statementInfo);
                ScanResult scanResult = (ScanResult) server.getManager().executePlan(translatedQuery.plan, translatedQuery.context, transactionContext);
                DataScanner dataScanner = scanResult.dataScanner;

                ServerSideScannerPeer scanner = new ServerSideScannerPeer(dataScanner);

                String[] columns = dataScanner.getFieldNames();
                List<DataAccessor> records = dataScanner.consume(fetchSize);
                TuplesList tuplesList = new TuplesList(columns, records);
                boolean last = dataScanner.isFinished();
                if (LOGGER.isLoggable(Level.FINEST)) {
                    LOGGER.log(Level.FINEST, "sending first {0} records to scanner {1} query {2}", new Object[]{records.size(), scannerId, query});
                }
                if (!last) {
                    scanners.put(scannerId, scanner);
                }
                try {
                    ByteBuf result = PduCodec.ResultSetChunk.write(message.messageId, tuplesList, last, dataScanner.getTransactionId());
                    channel.sendReplyMessage(message.messageId, result);
                } catch (HerdDBInternalException err) {
                    // do not leak an unserializable scanner
                    scanner.close();
                    throw err;
                }
                if (last) {
                    // no need to hold the scanner anymore
                    scanner.close();
                }
            } else {
                ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "unsupported query type for scan " + query + ": PLAN is " + translatedQuery.plan);
                channel.sendReplyMessage(message.messageId, error);
            }
        } catch (DataScannerException | HerdDBInternalException err) {
            if (err.getCause() != null && err.getCause() instanceof ValidationException) {
                // no stacktraces for bad queries
                LOGGER.log(Level.FINE, "SQL error on scanner " + scannerId + ": " + err);
            } else {
                LOGGER.log(Level.SEVERE, "error on scanner " + scannerId + ": " + err, err);
            }
            scanners.remove(scannerId);
            ByteBuf error = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, error);
        } finally {
            runningStatements.unregisterRunningStatement(statementInfo);
        }
    }

    private void handleFetchScannerData(Pdu message, Channel channel) {
        long scannerId = PduCodec.FetchScannerData.readScannerId(message);
        int fetchSize = PduCodec.FetchScannerData.readFetchSize(message);
        if (fetchSize <= 0) {
            fetchSize = 10;
        }
        ServerSideScannerPeer scanner = scanners.get(scannerId);
        if (scanner != null) {
            try {
                DataScanner dataScanner = scanner.getScanner();
                List<DataAccessor> records = dataScanner.consume(fetchSize);
                String[] columns = dataScanner.getFieldNames();
                TuplesList tuplesList = new TuplesList(columns, records);

                boolean last = false;
                if (dataScanner.isFinished()) {
                    LOGGER.log(Level.FINEST, "unregistering scanner {0}, resultset is finished", scannerId);
                    scanners.remove(scannerId);
                    last = true;
                }
//                        LOGGER.log(Level.SEVERE, "sending " + converted.size() + " records to scanner " + scannerId);
                try {
                    ByteBuf result = PduCodec.ResultSetChunk.write(message.messageId, tuplesList, last, dataScanner.getTransactionId());
                    channel.sendReplyMessage(message.messageId, result);
                } catch (HerdDBInternalException err) {
                    // do not leak an unserializable scanner
                    scanners.remove(scannerId);
                    scanner.close();
                    throw err;
                }
                if (last) {
                    dataScanner.close();
                }
            } catch (DataScannerException | StatementExecutionException err) {
                ByteBuf error = composeErrorResponse(message.messageId, err);
                channel.sendReplyMessage(message.messageId, error);
            }
        } else {
            ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "no such scanner " + scannerId);
            channel.sendReplyMessage(message.messageId, error);
        }
    }

    private void handleCloseScanner(Pdu message, Channel channel) {
        long scannerId = PduCodec.CloseScanner.readScannerId(message);
        ServerSideScannerPeer removed = scanners.remove(scannerId);
        if (removed != null) {
            if (LOGGER.isLoggable(Level.FINER)) {
                LOGGER.log(Level.FINER, "remove scanner {0} as requested by client", scannerId);
            }
            removed.clientClose();
        }
    }

    private void sendAuthRequiredError(Channel channel, Pdu message) {
        ByteBuf error = PduCodec.ErrorResponse.write(message.messageId,
                "autentication required (client " + this.channel + ")");
        channel.sendReplyMessage(message.messageId, error);
    }

    private void handleRequestTablespaceDump(Pdu message, Channel channel) {
        String dumpId = PduCodec.RequestTablespaceDump.readDumpId(message);
        int fetchSize = PduCodec.RequestTablespaceDump.readFetchSize(message);
        if (fetchSize <= 0) {
            fetchSize = 10;
        }
        String tableSpace = PduCodec.RequestTablespaceDump.readTablespace(message);
        boolean includeTransactionLog = PduCodec.RequestTablespaceDump.readInludeTransactionLog(message);
        server.getManager().dumpTableSpace(tableSpace, dumpId, message, channel, fetchSize, includeTransactionLog);
    }

    private void handleExecuteStatements(Pdu message, Channel channel) {
        long transactionId = PduCodec.ExecuteStatements.readTx(message);
        String tableSpace = PduCodec.ExecuteStatements.readTablespace(message);
        long statementId = PduCodec.ExecuteStatements.readStatementId(message);
        String query =
                statementId > 0 ? preparedStatements.resolveQuery(tableSpace, statementId)
                        : PduCodec.ExecuteStatements.readQuery(message);
        if (query == null) {
            ByteBuf error = PduCodec.ErrorResponse.writeMissingPreparedStatementError(message.messageId, "bad statement id: " + statementId);
            channel.sendReplyMessage(message.messageId, error);
            message.close();
            return;
        }
        boolean returnValues = PduCodec.ExecuteStatements.readReturnValues(message);
        PduCodec.ListOfListsReader statementParameters = PduCodec.ExecuteStatements.startReadStatementsParameters(message);
        int numStatements = statementParameters.getNumLists();
        List<List<Object>> batch = new ArrayList<>(numStatements);
        for (int i = 0; i < numStatements; i++) {
            PduCodec.ObjectListReader parametersReader = statementParameters.nextList();
            List<Object> batchParams = new ArrayList<>(parametersReader.getNumParams());
            for (int j = 0; j < parametersReader.getNumParams(); j++) {
                batchParams.add(parametersReader.nextObject());
            }
            batch.add(batchParams);
        }
        RunningStatementsStats runningStatements = server.getManager().getRunningStatements();
        RunningStatementInfo statementInfo = new RunningStatementInfo(query, System.currentTimeMillis(), tableSpace, "", numStatements);
        try {

            List<TranslatedQuery> queries = new ArrayList<>();
            for (int i = 0; i < numStatements; i++) {
                List<Object> parameters = batch.get(i);
                TranslatedQuery translatedQuery = server
                        .getManager()
                        .getPlanner().translate(tableSpace, query,
                                parameters, false, true, returnValues, -1);
                queries.add(translatedQuery);
            }

            List<Long> updateCounts = new CopyOnWriteArrayList<>();
            List<Map<String, Object>> otherDatas = new CopyOnWriteArrayList<>();

            class ComputeNext implements BiConsumer<StatementExecutionResult, Throwable> {

                int current;

                public ComputeNext(int current) {
                    this.current = current;
                }

                @Override
                public void accept(StatementExecutionResult result, Throwable error) {
                    if (error != null) {
                        ByteBuf errorMsg = composeErrorResponse(message.messageId, error);
                        channel.sendReplyMessage(message.messageId, errorMsg);
                        message.close();
                        runningStatements.unregisterRunningStatement(statementInfo);
                        return;
                    }
                    if (result instanceof DMLStatementExecutionResult) {
                        DMLStatementExecutionResult dml = (DMLStatementExecutionResult) result;
                        Map<String, Object> otherData = Collections.emptyMap();
                        if (returnValues && dml.getKey() != null) {
                            TranslatedQuery translatedQuery = queries.get(current - 1);
                            Statement statement = translatedQuery.plan.mainStatement;
                            TableAwareStatement tableStatement = (TableAwareStatement) statement;
                            Table table = server.getManager().getTableSpaceManager(statement.getTableSpace()).getTableManager(tableStatement.getTable()).getTable();
                            Object key = RecordSerializer.deserializePrimaryKey(dml.getKey(), table);
                            otherData = new HashMap<>();
                            otherData.put("_key", key);
                            if (dml.getNewvalue() != null) {
                                Map<String, Object> newvalue = RecordSerializer.toBean(new Record(dml.getKey(), dml.getNewvalue()), table);
                                otherData.putAll(newvalue);
                            }
                        }
                        updateCounts.add((long) dml.getUpdateCount());
                        otherDatas.add(otherData);
                    } else if (result instanceof DDLStatementExecutionResult) {
                        Map<String, Object> otherData = Collections.emptyMap();
                        updateCounts.add(1L);
                        otherDatas.add(otherData);
                    } else {
                        ByteBuf response = PduCodec.ErrorResponse.write(message.messageId, "bad result type " + result.getClass() + " (" + result + ")");
                        channel.sendReplyMessage(message.messageId, response);
                        message.close();
                        runningStatements.unregisterRunningStatement(statementInfo);
                        return;
                    }

                    long newTransactionId = result.transactionId;
                    if (current == queries.size()) {
                        try {
                            ByteBuf response = PduCodec.ExecuteStatementsResult.write(message.messageId, updateCounts, otherDatas, newTransactionId);
                            channel.sendReplyMessage(message.messageId, response);
                            message.close();
                            runningStatements.unregisterRunningStatement(statementInfo);
                        } catch (Throwable t) {
                            LOGGER.log(Level.SEVERE, "Internal error", t);
                        }
                        return;
                    }

                    TranslatedQuery nextPlannedQuery = queries.get(current);
                    TransactionContext transactionContext = new TransactionContext(newTransactionId);
                    CompletableFuture<StatementExecutionResult> nextPromise =
                            server.getManager().executePlanAsync(nextPlannedQuery.plan, nextPlannedQuery.context, transactionContext);
                    nextPromise.whenComplete(new ComputeNext(current + 1));
                }
            }

            TransactionContext transactionContext = new TransactionContext(transactionId);
            TranslatedQuery firstTranslatedQuery = queries.get(0);
            server.getManager().executePlanAsync(firstTranslatedQuery.plan, firstTranslatedQuery.context, transactionContext)
                    .whenComplete(new ComputeNext(1));

        } catch (HerdDBInternalException err) {
            ByteBuf response = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, response);
            message.close();
            runningStatements.unregisterRunningStatement(statementInfo);
        }
    }

    /**
     * This method is like {@link #handleExecuteStatement(herddb.proto.Pdu, herddb.network.Channel) } but in "local" mode,
     * we do not want here to marshal/unmarshal values, in order to save resources
     */
    public herddb.client.GetResult executeGet(String tablespace, String query, long txId, List<Object> parameters) throws HDBException {
        // ensure we are dealing with the same data types that we see when the request id coming from the wire
        parameters = PduCodec.normalizeParametersList(parameters);
        TransactionContext transactionContext = new TransactionContext(txId);
        TranslatedQuery translatedQuery;
        try {
            translatedQuery = server.getManager().getPlanner().translate(tablespace,
                    query, parameters, false, true, true, -1);
            Statement statement = translatedQuery.plan.mainStatement;
            CompletableFuture<StatementExecutionResult> res = server
                    .getManager()
                    .executePlanAsync(translatedQuery.plan, translatedQuery.context, transactionContext);
            CompletableFuture<herddb.client.GetResult> finalResult = res.handle((result, err) -> {
                if (err != null) {
                    while (err instanceof CompletionException) {
                        err = err.getCause();
                    }
                    if (err instanceof DuplicatePrimaryKeyException) {
                        throw new CompletionException(new SQLIntegrityConstraintViolationException(err));
                    } else {
                        throw new CompletionException(new SQLException(err));
                    }
                }
                if (result instanceof GetResult) {
                    GetResult get = (GetResult) result;
                    if (!get.found()) {
                        return new herddb.client.GetResult(null, get.transactionId);
                    } else {
                        Map<String, Object> record = get.getRecord().toBean(get.getTable());
                        Map<RawString, Object> recordForClient = new HashMap<>(record.size());
                        record.forEach((k, v) -> {
                            recordForClient.put(RawString.of(k), v);
                        });
                        return new herddb.client.GetResult(recordForClient, get.transactionId);
                    }
                } else {
                    throw new CompletionException(new SQLException("Unknown result type " + result.getClass() + ": " + result));
                }
            });
            return finalResult.get();
        } catch (Throwable err) {
            while (err instanceof CompletionException) {
                err = err.getCause();
            }
            throw new HDBException(err);
        }
    }

    /**
     * This method is like {@link #handleExecuteStatement(herddb.proto.Pdu, herddb.network.Channel) } but in "local" mode,
     * we do not want here to marshal/unmarshal values, in order to save resources
     */
    public DMLResult executeUpdate(String tablespace, String query, long txId, boolean returnValues, List<Object> parameters) throws HDBException {
        // ensure we are dealing with the same data types that we see when the request id coming from the wire
        parameters = PduCodec.normalizeParametersList(parameters);
        TransactionContext transactionContext = new TransactionContext(txId);
        TranslatedQuery translatedQuery;
        try {
            translatedQuery = server.getManager().getPlanner().translate(tablespace,
                    query, parameters, false, true, returnValues, -1);
            Statement statement = translatedQuery.plan.mainStatement;
            CompletableFuture<StatementExecutionResult> res = server
                    .getManager()
                    .executePlanAsync(translatedQuery.plan, translatedQuery.context, transactionContext);
            CompletableFuture<DMLResult> finalResult = res.handle((result, err) -> {
                if (err != null) {
                    while (err instanceof CompletionException) {
                        err = err.getCause();
                    }
                    if (err instanceof DuplicatePrimaryKeyException) {
                        throw new CompletionException(new SQLIntegrityConstraintViolationException(err));
                    } else {
                        throw new CompletionException(new SQLException(err));
                    }
                }
                if (result instanceof DMLStatementExecutionResult) {
                    DMLStatementExecutionResult dml = (DMLStatementExecutionResult) result;

                    if (returnValues && dml.getKey() != null) {
                        TableAwareStatement tableStatement = statement.unwrap(TableAwareStatement.class);
                        Table table = server
                                .getManager()
                                .getTableSpaceManager(statement.getTableSpace()).getTableManager(tableStatement.getTable()).getTable();
                        final Map<RawString, Object> newRecord = new HashMap<>();
                        Object newKey = RecordSerializer.deserializePrimaryKey(dml.getKey(), table);
                        newRecord.put(RAWSTRING_KEY, newKey);
                        if (dml.getNewvalue() != null) {
                            Map<String, Object> toBean = RecordSerializer.toBean(new Record(dml.getKey(), dml.getNewvalue()), table);
                            toBean.forEach((k, v) -> {
                                newRecord.put(RawString.of(k), v);
                            });
                        }
                        return new DMLResult(dml.getUpdateCount(), newKey, newRecord, dml.transactionId);
                    } else {
                        return new DMLResult(dml.getUpdateCount(), null, null, dml.transactionId);
                    }
                } else if (result instanceof DDLStatementExecutionResult) {
                    return new DMLResult(1, null, null, result.transactionId);
                } else {
                    throw new CompletionException(new SQLException("Unknown result type " + result.getClass() + ": " + result));
                }
            });
            return finalResult.get();
        } catch (Throwable err) {
            while (err instanceof CompletionException) {
                err = err.getCause();
            }
            throw new HDBException(err);
        }
    }

    private void handleExecuteStatement(Pdu message, Channel channel) {
        long txId = PduCodec.ExecuteStatement.readTx(message);
        String tablespace = PduCodec.ExecuteStatement.readTablespace(message);
        long statementId = PduCodec.ExecuteStatement.readStatementId(message);
        String query =
                statementId > 0 ? preparedStatements.resolveQuery(tablespace, statementId)
                        : PduCodec.ExecuteStatement.readQuery(message);
        if (query == null) {
            ByteBuf error = PduCodec.ErrorResponse.writeMissingPreparedStatementError(message.messageId, "bad statement id: " + statementId);
            channel.sendReplyMessage(message.messageId, error);
            message.close();
            return;
        }
        boolean returnValues = PduCodec.ExecuteStatement.readReturnValues(message);

        PduCodec.ObjectListReader parametersReader = PduCodec.ExecuteStatement.startReadParameters(message);
        List<Object> parameters = new ArrayList<>(parametersReader.getNumParams());
        for (int i = 0; i < parametersReader.getNumParams(); i++) {
            parameters.add(parametersReader.nextObject());
        }
        if (LOGGER.isLoggable(Level.FINEST)) {
            LOGGER.log(Level.FINEST, "query {0} with {1}", new Object[]{query, parameters});
        }

        RunningStatementInfo statementInfo = new RunningStatementInfo(query, System.currentTimeMillis(), tablespace, "", 1);
        TransactionContext transactionContext = new TransactionContext(txId);
        TranslatedQuery translatedQuery;
        try {
            translatedQuery = server.getManager().getPlanner().translate(tablespace,
                    query, parameters, false, true, returnValues, -1);
        } catch (StatementExecutionException ex) {
            ByteBuf error = composeErrorResponse(message.messageId, ex);
            channel.sendReplyMessage(message.messageId, error);
            message.close();
            return;
        }

        Statement statement = translatedQuery.plan.mainStatement;
//                    LOGGER.log(Level.SEVERE, "query " + query + ", " + parameters + ", plan: " + translatedQuery.plan);
        RunningStatementsStats runningStatements = server.getManager().getRunningStatements();
        runningStatements.registerRunningStatement(statementInfo);
        CompletableFuture<StatementExecutionResult> res = server
                .getManager()
                .executePlanAsync(translatedQuery.plan, translatedQuery.context, transactionContext);
//                    LOGGER.log(Level.SEVERE, "query " + query + ", " + parameters + ", result:" + result);
        res.whenComplete((result, err) -> {
            try {
                runningStatements.unregisterRunningStatement(statementInfo);
                if (err != null) {
                    while (err instanceof CompletionException) {
                        err = err.getCause();
                    }
                    if (err instanceof DuplicatePrimaryKeyException) {
                        ByteBuf error = PduCodec.ErrorResponse.writeSqlIntegrityConstraintsViolation(message.messageId, new SQLIntegrityConstraintViolationException(err));
                        channel.sendReplyMessage(message.messageId, error);
                    } else if (err instanceof NotLeaderException) {
                        ByteBuf error = composeErrorResponse(message.messageId, err);
                        channel.sendReplyMessage(message.messageId, error);
                    } else if (err instanceof StatementExecutionException) {
                        ByteBuf error = composeErrorResponse(message.messageId, err);
                        channel.sendReplyMessage(message.messageId, error);
                    } else {
                        LOGGER.log(Level.SEVERE, "unexpected error on query " + query + ", parameters: " + parameters + ":" + err, err);
                        ByteBuf error = composeErrorResponse(message.messageId, err);
                        channel.sendReplyMessage(message.messageId, error);
                    }
                    return;
                }
                if (result instanceof DMLStatementExecutionResult) {
                    DMLStatementExecutionResult dml = (DMLStatementExecutionResult) result;
                    Map<String, Object> newRecord = null;

                    if (returnValues && dml.getKey() != null) {
                        TableAwareStatement tableStatement = statement.unwrap(TableAwareStatement.class);
                        Table table = server
                                .getManager()
                                .getTableSpaceManager(statement.getTableSpace()).getTableManager(tableStatement.getTable()).getTable();
                        newRecord = new HashMap<>();
                        Object newKey = RecordSerializer.deserializePrimaryKey(dml.getKey(), table);
                        newRecord.put("_key", newKey);
                        if (dml.getNewvalue() != null) {
                            newRecord.putAll(RecordSerializer.toBean(new Record(dml.getKey(), dml.getNewvalue()), table));
                        }
                    }
                    channel.sendReplyMessage(message.messageId,
                            PduCodec.ExecuteStatementResult.write(
                                    message.messageId, dml.getUpdateCount(), dml.transactionId, newRecord));
                } else if (result instanceof GetResult) {
                    GetResult get = (GetResult) result;
                    if (!get.found()) {
                        channel.sendReplyMessage(message.messageId,
                                PduCodec.ExecuteStatementResult.write(
                                        message.messageId, 0, get.transactionId, null));
                    } else {
                        Map<String, Object> record = get.getRecord().toBean(get.getTable());
                        channel.sendReplyMessage(message.messageId,
                                PduCodec.ExecuteStatementResult.write(
                                        message.messageId, 1, get.transactionId, record));
                    }
                } else if (result instanceof TransactionResult) {
                    TransactionResult txresult = (TransactionResult) result;
                    channel.sendReplyMessage(message.messageId,
                            PduCodec.ExecuteStatementResult.write(
                                    message.messageId, 1, txresult.getTransactionId(), null));
                } else if (result instanceof DDLStatementExecutionResult) {
                    DDLStatementExecutionResult ddl = (DDLStatementExecutionResult) result;
                    channel.sendReplyMessage(message.messageId,
                            PduCodec.ExecuteStatementResult.write(
                                    message.messageId, 1, ddl.transactionId, null));
                } else if (result instanceof DataConsistencyStatementResult) {
                    channel.sendReplyMessage(message.messageId,
                            PduCodec.ExecuteStatementResult.write(
                                    message.messageId, 0, 0, null));
                } else {
                    ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "unknown result type:" + result);
                    channel.sendReplyMessage(message.messageId, error);
                }
            } finally {
                message.close();
            }
        });
    }

    private void handlePrepareStatement(Pdu message, Channel channel) {
        try {
            String query = PduCodec.PrepareStatement.readQuery(message);
            String tablespace = PduCodec.PrepareStatement.readTablespace(message);
            TableSpaceManager tableSpaceManager = server.getManager().getTableSpaceManager(tablespace);
            if (tableSpaceManager == null) {
                ByteBuf error = PduCodec.ErrorResponse.writeNotLeaderError(message.messageId, "no such tablespace " + tablespace + " (at " + server.getManager().getNodeId() + ")");
                channel.sendReplyMessage(message.messageId, error);
                return;
            } else if (!tableSpaceManager.isLeader()) {
                ByteBuf error = PduCodec.ErrorResponse.writeNotLeaderError(message.messageId, "not leader for " + tablespace);
                channel.sendReplyMessage(message.messageId, error);
                return;
            }
            long newId = preparedStatements.prepare(tablespace, query);
            channel.sendReplyMessage(message.messageId,
                    PduCodec.PrepareStatementResult.write(
                            message.messageId, newId));
        } finally {
            message.close();
        }
    }

    public void rollbackTransaction(String tableSpace, long txId) throws HDBException {
        try {
            RollbackTransactionStatement statement = new RollbackTransactionStatement(tableSpace, txId);
            TransactionContext transactionContext = new TransactionContext(txId);
            server.getManager().executeStatement(statement, StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), transactionContext);
        } catch (HerdDBInternalException t) {
            throw new HDBException(t);
        }
    }

    public void commitTransaction(String tableSpace, long txId) throws HDBException {
        try {
            CommitTransactionStatement statement = new CommitTransactionStatement(tableSpace, txId);
            TransactionContext transactionContext = new TransactionContext(txId);
            server.getManager().executeStatement(statement, StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), transactionContext);
        } catch (HerdDBInternalException t) {
            throw new HDBException(t);
        }
    }

    public long beginTransaction(String tableSpace) throws HDBException {
        try {
            BeginTransactionStatement statement = new BeginTransactionStatement(tableSpace);
            TransactionContext transactionContext = TransactionContext.NO_TRANSACTION;
            return server.getManager().executeStatement(statement, StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), transactionContext).transactionId;
        } catch (HerdDBInternalException t) {
            throw new HDBException(t);
        }
    }

    private void handleTxCommand(Pdu message, Channel channel) {
        long txId = PduCodec.TxCommand.readTx(message);
        int type = PduCodec.TxCommand.readCommand(message);
        String tableSpace = PduCodec.TxCommand.readTablespace(message);
        TransactionContext transactionContext = new TransactionContext(txId);
        Statement statement;
        switch (type) {
            case TX_COMMAND_COMMIT_TRANSACTION:
                statement = new CommitTransactionStatement(tableSpace, txId);
                break;
            case TX_COMMAND_ROLLBACK_TRANSACTION:
                statement = new RollbackTransactionStatement(tableSpace, txId);
                break;
            case TX_COMMAND_BEGIN_TRANSACTION:
                statement = new BeginTransactionStatement(tableSpace);
                break;
            default:
                statement = null;

        }
        if (statement == null) {
            ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "unknown txcommand type:" + type);
            channel.sendReplyMessage(message.messageId, error);
            message.close();
        } else {
//            LOGGER.log(Level.SEVERE, "statement " + statement);
            CompletableFuture<StatementExecutionResult> res = server
                    .getManager()
                    .executeStatementAsync(statement, StatementEvaluationContext.DEFAULT_EVALUATION_CONTEXT(), transactionContext);
//                    LOGGER.log(Level.SEVERE, "query " + query + ", " + parameters + ", result:" + result);
            res.whenComplete((result, err) -> {
                try {
                    if (err != null) {
                        if (err instanceof NotLeaderException) {
                            ByteBuf error = composeErrorResponse(message.messageId, err);
                            channel.sendReplyMessage(message.messageId, error);
                        } else if (err instanceof StatementExecutionException) {
                            ByteBuf error = composeErrorResponse(message.messageId, err);
                            channel.sendReplyMessage(message.messageId, error);
                        } else {
                            LOGGER.log(Level.SEVERE, "unexpected error on tx command: ", err);
                            ByteBuf error = composeErrorResponse(message.messageId, err);
                            channel.sendReplyMessage(message.messageId, error);
                        }
                    } else {
                        if (result instanceof TransactionResult) {
                            TransactionResult txresult = (TransactionResult) result;
                            ByteBuf response = PduCodec.TxCommandResult.write(message.messageId, txresult.transactionId);
                            channel.sendReplyMessage(message.messageId, response);
                        } else {
                            ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "unknown result type:" + result);
                            channel.sendReplyMessage(message.messageId, error);

                        }
                    }
                } finally {
                    message.close();
                }
            });
        }
    }

    private void handleSaslTokenMessage(Pdu message, Channel channel) {
        try {
            if (saslNettyServer == null) {
                ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "Authentication failed (SASL protocol error)");
                channel.sendReplyMessage(message.messageId, error);
                return;
            }
            byte[] token = PduCodec.SaslTokenMessageToken.readToken(message);
            byte[] responseToken = saslNettyServer.response(token);
            ByteBuf tokenChallenge = PduCodec.SaslTokenServerResponse.write(message.messageId, responseToken);
            if (saslNettyServer.isComplete()) {
                username = saslNettyServer.getUserName();
                authenticated = true;
                LOGGER.log(Level.INFO, "client {0} connected as {1}", new Object[]{this.channel.getRemoteAddress(), username});
                saslNettyServer = null;
            }
            channel.sendReplyMessage(message.messageId, tokenChallenge);
        } catch (Exception err) {
            if (err instanceof javax.security.sasl.SaslException) {
                LOGGER.log(Level.SEVERE, "SASL error " + err, err);
                ByteBuf error = PduCodec.ErrorResponse.write(message.messageId, "Authentication failed (SASL error)");
                channel.sendReplyMessage(message.messageId, error);
            } else {
                LOGGER.log(Level.SEVERE, "Bad auth error " + err, err);
                ByteBuf error = composeErrorResponse(message.messageId, err);
                channel.sendReplyMessage(message.messageId, error);
            }
        }
    }

    private void handleSaslTokenMessageRequest(Pdu message, Channel channel) {
        try {
            String mech = PduCodec.SaslTokenMessageRequest.readMech(message);
            byte[] token = PduCodec.SaslTokenMessageRequest.readToken(message);
            if (token == null) {
                token = new byte[0];
            }
            if (saslNettyServer == null) {
                saslNettyServer = new SaslNettyServer(server, mech);
            }
            byte[] responseToken = saslNettyServer.response(token);
            if (saslNettyServer.isComplete()) {
                username = saslNettyServer.getUserName();
                authenticated = true;
                LOGGER.log(Level.INFO, "client {0} connected as {1}", new Object[]{this.channel.getRemoteAddress(), username});
                saslNettyServer = null;
            }
            ByteBuf tokenChallenge = PduCodec.SaslTokenServerResponse.write(message.messageId, responseToken);
            channel.sendReplyMessage(message.messageId, tokenChallenge);
        } catch (Exception err) {
            ByteBuf error = composeErrorResponse(message.messageId, err);
            channel.sendReplyMessage(message.messageId, error);
        }
    }

    @Override
    public void channelClosed(Channel channel) {
        if (!channel.isLocalChannel()) {
            LOGGER.log(Level.INFO, "channelClosed {0}", this);
        }
        freeResources();
        this.server.connectionClosed(this);
    }

    private void freeResources() {
        scanners.values().forEach(ServerSideScannerPeer::close);
        scanners.clear();
    }

    ConnectionsInfo.ConnectionInfo toConnectionInfo() {
        return new ConnectionsInfo.ConnectionInfo(id + "", connectionTs, username, address);
    }

    public ConcurrentMap<Long, ServerSideScannerPeer> getScanners() {
        return scanners;
    }

    @Override
    public String toString() {
        return "ServerSideConnectionPeer{" + "id=" + id + ", channel=" + channel + ", address=" + address + ", username=" + username + '}';
    }

    public ScanResultSet executeScan(String tableSpace, String query, boolean usePreparedStatement, List<Object> parameters, long txId, int maxRows, int fetchSize, boolean keepReadLocks) throws HDBException {

        if (query == null) {
            throw new HDBException("bad query null");
        }

        parameters = PduCodec.normalizeParametersList(parameters);

        try {
            TranslatedQuery translatedQuery = server
                    .getManager()
                    .getPlanner().translate(tableSpace,
                            query, parameters, true, true, false, maxRows);
            translatedQuery.context.setForceRetainReadLock(keepReadLocks);

            if (LOGGER.isLoggable(Level.FINEST)) {
                LOGGER.log(Level.FINEST, "{0} -> {1}", new Object[]{query, translatedQuery.plan.mainStatement});
            }

            TransactionContext transactionContext = new TransactionContext(txId);
            if (translatedQuery.plan.mainStatement instanceof SQLPlannedOperationStatement
                    || translatedQuery.plan.mainStatement instanceof ScanStatement) {
                ScanResult scanResult = (ScanResult) server.getManager().executePlan(translatedQuery.plan, translatedQuery.context, transactionContext);
                DataScanner dataScanner = scanResult.dataScanner;
                return new LocalClientScanResultSetImpl(dataScanner);
            } else {
                throw new HDBException("unsupported query type for scan " + query + ": PLAN is " + translatedQuery.plan);
            }
        } catch (HerdDBInternalException err) {
            if (err.getCause() != null && err.getCause() instanceof ValidationException) {
                // no stacktraces for bad queries
                LOGGER.log(Level.FINE, "SQL error on scanner: " + err);
            } else {
                LOGGER.log(Level.SEVERE, "error on scanner: " + err, err);
            }
            throw new HDBException(err);
        }
    }

    public List<DMLResult> executeUpdates(String tableSpace, String query, long transactionId, boolean returnValues, List<List<Object>> originalBatch) throws HDBException {
        if (query == null) {
            throw new HDBException("bad query null");
        }

        List<List<Object>> batch = new ArrayList<>(originalBatch.size());
        for (int i = 0; i < originalBatch.size(); i++) {
            batch.add(PduCodec.normalizeParametersList(originalBatch.get(i)));
        }
        try {

            List<TranslatedQuery> queries = new ArrayList<>();
            for (int i = 0; i < batch.size(); i++) {
                List<Object> parameters = batch.get(i);
                TranslatedQuery translatedQuery = server
                        .getManager()
                        .getPlanner().translate(tableSpace, query,
                                parameters, false, true, returnValues, -1);
                queries.add(translatedQuery);
            }

            List<Long> updateCounts = new CopyOnWriteArrayList<>();
            List<Map<RawString, Object>> otherDatas = new CopyOnWriteArrayList<>();
            CompletableFuture<Long> finalResult = new CompletableFuture<>();
            class ComputeNext implements BiConsumer<StatementExecutionResult, Throwable> {

                int current;

                public ComputeNext(int current) {
                    this.current = current;
                }

                @Override
                public void accept(StatementExecutionResult result, Throwable error) {
                    if (error != null) {
                        finalResult.completeExceptionally(error);
                        return;
                    }
                    if (result instanceof DMLStatementExecutionResult) {
                        DMLStatementExecutionResult dml = (DMLStatementExecutionResult) result;
                        final Map<RawString, Object> otherData;
                        if (returnValues && dml.getKey() != null) {
                            TranslatedQuery translatedQuery = queries.get(current - 1);
                            Statement statement = translatedQuery.plan.mainStatement;
                            TableAwareStatement tableStatement = (TableAwareStatement) statement;
                            Table table = server.getManager().getTableSpaceManager(statement.getTableSpace()).getTableManager(tableStatement.getTable()).getTable();
                            Object key = RecordSerializer.deserializePrimaryKey(dml.getKey(), table);
                            otherData = new HashMap<>();
                            otherData.put(RAWSTRING_KEY, key);
                            if (dml.getNewvalue() != null) {
                                Map<String, Object> newvalue = RecordSerializer.toBean(new Record(dml.getKey(), dml.getNewvalue()), table);
                                newvalue.forEach((k, v) -> {
                                    otherData.put(RawString.of(k), v);
                                });

                            }
                        } else {
                           otherData = Collections.emptyMap();
                        }
                        updateCounts.add((long) dml.getUpdateCount());
                        otherDatas.add(otherData);
                    } else if (result instanceof DDLStatementExecutionResult) {
                        Map<RawString, Object> otherData = Collections.emptyMap();
                        updateCounts.add(1L);
                        otherDatas.add(otherData);
                    } else {
                        finalResult.completeExceptionally(new Exception("bad result type " + result.getClass() + " (" + result + ")"));
                        return;
                    }

                    long newTransactionId = result.transactionId;
                    if (current == queries.size()) {
                        try {
                            finalResult.complete(newTransactionId);
                        } catch (Throwable t) {
                            finalResult.completeExceptionally(t);
                        }
                        return;
                    }

                    TranslatedQuery nextPlannedQuery = queries.get(current);
                    TransactionContext transactionContext = new TransactionContext(newTransactionId);
                    CompletableFuture<StatementExecutionResult> nextPromise =
                            server.getManager().executePlanAsync(nextPlannedQuery.plan, nextPlannedQuery.context, transactionContext);
                    nextPromise.whenComplete(new ComputeNext(current + 1));
                }
            }

            TransactionContext transactionContext = new TransactionContext(transactionId);
            TranslatedQuery firstTranslatedQuery = queries.get(0);
            server.getManager().executePlanAsync(firstTranslatedQuery.plan, firstTranslatedQuery.context, transactionContext)
                    .whenComplete(new ComputeNext(1));

            long finalTransactionId = finalResult.get();
            List<DMLResult> returnedValues = new ArrayList<>();
            for (int i = 0; i < updateCounts.size(); i++) {
                returnedValues.add(new DMLResult(updateCounts.get(i), otherDatas.get(i).get(RAWSTRING_KEY), otherDatas.get(i), finalTransactionId));
            }

            return returnedValues;

        } catch (HerdDBInternalException err) {
            throw new HDBException(err);
        } catch (InterruptedException err) {
            Thread.currentThread().interrupt();
            throw new HDBException(err);
        } catch (ExecutionException err) {
            throw new HDBException(err.getCause());
        }
    }

}
